import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend


object wordcount_increstate {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //开启checkpoint才会有重启策略
    env.enableCheckpointing(5000)
    //失败后最多重启3次，每次重启间隔2000ms
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000))

    val config = env.getCheckpointConfig
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //取消作业时保留检查点
//    env.setStateBackend(new FsStateBackend("hdfs://Desktop:9000/tmp/flinkck"))
    env.setStateBackend(new RocksDBStateBackend("hdfs://Desktop:9000/tmp/flinkck", true))


    val inputStream = env.socketTextStream("Desktop", 9999) //nc -lk 9999
    inputStream.flatMap(_.split(" ")).map(new TMap()).map((_, 1)).keyBy(0).sum(1).print()

    System.out.println(env.getExecutionPlan)

    env.execute("stream word count job")
  }
}
